#!/bin/bash

# 检查是否为root用户
if [[ "$(whoami)" != "root" ]]; then
  echo "请使用root用户运行此脚本。"
  exit 1
fi

# 确保脚本在遇到错误时立即退出
set -e
set -o pipefail

source ./common.sh

#提取kafka名称
kafka_version1=$(basename "${kafka_package_path}" | awk -F'.tgz' '{print $1}')

kafka_version="kafka"

# 配置环境变量
env_variables=(
  "#KAFKA_HOME"
  "export KAFKA_HOME=${module_path}/${kafka_version}"
  "export PATH=\$PATH:\$KAFKA_HOME/bin"
)

# 检查Zookeeper是否已启动
for server in "${zookeeper_servers[@]}"; do
  
  echo "开始检查Zookeeper状态 ${server}"
  
  # 检查Zookeeper状态
  zookeeper_status=$(sshpass -p "${new_user_password}" ssh -o StrictHostKeyChecking=no "${new_user}@${server}" "ps -ef | grep zookeeper | grep -v grep" || echo "6666666")

	if [ "${zookeeper_status}" == "6666666" ]; then
	
	  echo "zookeeper_status:${zookeeper_status}"
  
      echo "Zookeeper未在服务器${server}上启动。请启动Zookeeper并重新运行此脚本。"
	
      exit 1
	fi
done

for server in "${kafka_servers[@]}"; do
  # 复制安装包
  sshpass -p "${new_user_password}" ssh -o StrictHostKeyChecking=no "${new_user}@${master_server}" "rsync -av ${kafka_package_path}  ${new_user}@${server}:${software_directory}"
done

for server in "${kafka_servers[@]}"; do
  # 删除已存在的目录
  sshpass -p "${new_user_password}" ssh -o StrictHostKeyChecking=no "${new_user}@${server}" "sudo rm -rf ${module_path}/kafka*"
done

# 解压缩安装包
for server in "${kafka_servers[@]}"; do
  
  echo "开始解压 ${server}"
  
  sshpass -p "${new_user_password}" ssh -o StrictHostKeyChecking=no "${new_user}@${server}" "tar -zxf ${kafka_package_path} -C ${module_path}/"
done

#修改解压后的文件名称
for server in "${kafka_servers[@]}"; do
  sshpass -p "${new_user_password}" ssh -o StrictHostKeyChecking=no "${new_user}@${server}" "mv ${module_path}/${kafka_version1} ${module_path}/kafka"
done

#修改配置文件 broker.id
broker_id=-1
for server in "${kafka_servers[@]}"; do

  broker_id=$((broker_id + 1))
  
  line_number=$(sshpass -p "${new_user_password}" ssh -o StrictHostKeyChecking=no "${new_user}@${server}" "awk '/broker.id/{print NR; exit}' ${module_path}/${kafka_version}/config/server.properties")
  
  sshpass -p "${new_user_password}" ssh -o StrictHostKeyChecking=no "${new_user}@${server}" "sed -i \"${line_number}d\" ${module_path}/${kafka_version}/config/server.properties"
  
  sshpass -p "${new_user_password}" ssh -o StrictHostKeyChecking=no "${new_user}@${server}" "sudo sed -i \"${line_number}ibroker.id=${broker_id}\" ${module_path}/${kafka_version}/config/server.properties"
  
done

#修改配置文件 broker对外暴露的IP和端口
for server in "${kafka_servers[@]}"; do

  advertised_line_number=$(sshpass -p "${new_user_password}" ssh -o StrictHostKeyChecking=no "${new_user}@${server}" "awk '/advertised.listeners/{print NR; exit}' ${module_path}/${kafka_version}/config/server.properties")
  
  sshpass -p "${new_user_password}" ssh -o StrictHostKeyChecking=no "${new_user}@${server}" "sed -i \"${advertised_line_number}d\" ${module_path}/${kafka_version}/config/server.properties"
  
  sshpass -p "${new_user_password}" ssh -o StrictHostKeyChecking=no "${new_user}@${server}" "sudo sed -i \"${advertised_line_number}iadvertised.listeners=PLAINTEXT://${server}:9092\" ${module_path}/${kafka_version}/config/server.properties"
  
done

#修改配置文件 log.dirs
for server in "${kafka_servers[@]}"; do

  log_line_number=$(sshpass -p "${new_user_password}" ssh -o StrictHostKeyChecking=no "${new_user}@${server}" "awk '/log.dirs/{print NR; exit}' ${module_path}/${kafka_version}/config/server.properties")
  
  sshpass -p "${new_user_password}" ssh -o StrictHostKeyChecking=no "${new_user}@${server}" "sed -i \"${log_line_number}d\" ${module_path}/${kafka_version}/config/server.properties"
  
  sshpass -p "${new_user_password}" ssh -o StrictHostKeyChecking=no "${new_user}@${server}" "sudo sed -i \"${log_line_number}ilog.dirs=${module_path}/kafka/datas\" ${module_path}/${kafka_version}/config/server.properties"
  
done

#修改配置文件 zookeeper.connect

zookeeper_connect="zookeeper.connect="
port="2181"
zk_path="/kafka"

# 遍历数组并拼接字符串
for server in "${zookeeper_servers[@]}"; do
  zookeeper_connect+="${server}:${port},"
done

# 移除最后的逗号
zookeeper_connect="${zookeeper_connect%,}${zk_path}"

echo "${zookeeper_connect}"

for server in "${kafka_servers[@]}"; do
  
  zookeeper_connect_line_number=$(sshpass -p "${new_user_password}" ssh -o StrictHostKeyChecking=no "${new_user}@${server}" "awk '/zookeeper.connect/{print NR; exit}' ${module_path}/${kafka_version}/config/server.properties")
  
  sshpass -p "${new_user_password}" ssh -o StrictHostKeyChecking=no "${new_user}@${server}" "sed -i \"${zookeeper_connect_line_number}d\" ${module_path}/${kafka_version}/config/server.properties"
  
  sshpass -p "${new_user_password}" ssh -o StrictHostKeyChecking=no "${new_user}@${server}" "sudo sed -i \"${zookeeper_connect_line_number}i${zookeeper_connect}\" ${module_path}/${kafka_version}/config/server.properties"
	
done

for server in "${kafka_servers[@]}"; do
  # 判断my_env.sh文件是否存在，如果不存在，新建
  echo "开始判断my_env.sh文件是否存在"
  sshpass -p "${new_user_password}" ssh -o StrictHostKeyChecking=no "${new_user}@${server}" "sudo touch /etc/profile.d/my_env.sh"
  echo "判断my_env.sh文件是否存在完成"
done

for server in "${kafka_servers[@]}"; do
  # 删除原有的配置
  sshpass -p "${new_user_password}" ssh -o StrictHostKeyChecking=no "${new_user}@${server}" "sudo sed -i '/KAFKA_HOME/d' /etc/profile.d/my_env.sh"
done

for server in "${kafka_servers[@]}"; do
  # 追加新的配置
  for line in "${env_variables[@]}"; do
    sshpass -p "${new_user_password}" ssh -o StrictHostKeyChecking=no "${new_user}@${server}" "echo '${line}' | sudo tee -a /etc/profile.d/my_env.sh"
  done
done

for server in "${kafka_servers[@]}"; do
  # 刷新环境变量
  sshpass -p "${new_user_password}" ssh -o StrictHostKeyChecking=no "${new_user}@${server}" "source /etc/profile"
done

#启动Kafka
for server in "${kafka_servers[@]}"; do
  sshpass -p "${new_user_password}" ssh -o StrictHostKeyChecking=no "${new_user}@${server}" "${module_path}/${kafka_version}/bin/kafka-server-start.sh -daemon ${module_path}/${kafka_version}/config/server.properties"
done

echo "kafka 安装成功"